package analytics

import (
	baseLog "gitee.com/zaiqiang231/go-base-app/base_app/log"
	"gitee.com/zaiqiang231/lovesport-authz-service/app/config"
	"gitee.com/zaiqiang231/lovesport-authz-service/app/store"
	"github.com/vmihailenco/msgpack/v5"
	"sync"
	"sync/atomic"
	"time"
)

const AnalyticsKeyName = "lovesport-system-analytics"
const (
	recordsBufferForcedFlushInterval = 1 * time.Second
)

type AnalyticsRecord struct {
	TimeStamp  int64     `json:"timestamp"`
	Username   string    `json:"username"`
	Effect     string    `json:"effect"`
	Conclusion string    `json:"conclusion"`
	Request    string    `json:"request"`
	Policies   string    `json:"policies"`
	Deciders   string    `json:"deciders"`
	ExpireAt   time.Time `json:"expireAt"   bson:"expireAt"`
}

func (a *AnalyticsRecord) SetExpiry(expiresInSeconds int64) {
	expiry := time.Duration(expiresInSeconds) * time.Second
	if expiresInSeconds == 0 {
		// Expiry is set to 100 years
		expiry = 24 * 365 * 100 * time.Hour
	}

	t := time.Now()
	t2 := t.Add(expiry)
	a.ExpireAt = t2
}

type Analytics struct {
	poolSize                   int
	recordsChan                chan *AnalyticsRecord
	workerBufferSize           uint64
	recordsBufferFlushInterval time.Duration
	storageExpirationTime      time.Duration
	shouldStop                 uint32
	poolWg                     sync.WaitGroup
}

var analytics *Analytics

func NewAnalytics(options *config.AnalyticsConfig) *Analytics {
	ps := options.PoolSize
	recordsBufferSize := options.RecordsBufferSize
	workerBufferSize := recordsBufferSize / uint64(ps)

	recordsChan := make(chan *AnalyticsRecord, recordsBufferSize)

	analytics = &Analytics{
		poolSize:                   ps,
		recordsChan:                recordsChan,
		workerBufferSize:           workerBufferSize,
		recordsBufferFlushInterval: options.FlushInterval,
		storageExpirationTime:      options.StorageExpirationTime,
	}

	return analytics
}

func GetAnalytics() *Analytics {
	return analytics
}

func (r *Analytics) Start() {
	// start worker pool
	atomic.SwapUint32(&r.shouldStop, 0)
	for i := 0; i < r.poolSize; i++ {
		r.poolWg.Add(1)
		go r.recordWorker()
	}
}

func (r *Analytics) Stop() {
	// flag to stop sending records into channel
	atomic.SwapUint32(&r.shouldStop, 1)
	// close channel to stop workers
	close(r.recordsChan)
	// wait for all workers to be done
	r.poolWg.Wait()
}

func (r *Analytics) RecordHit(record *AnalyticsRecord) error {
	// check if we should stop sending records 1st
	if atomic.LoadUint32(&r.shouldStop) > 0 {
		return nil
	}
	// just send record to channel consumed by pool of workers
	// leave all data crunching and Redis I/O work for pool workers
	r.recordsChan <- record
	return nil
}

func (r *Analytics) recordWorker() {
	defer r.poolWg.Done()
	// this is buffer to send one pipelined command to redis
	// use r.recordsBufferSize as cap to reduce slice re-allocations
	recordsBuffer := make([][]byte, 0, r.workerBufferSize)

	// read records from channel and process
	lastSentTS := time.Now()
	for {
		var readyToSend bool
		select {
		case record, ok := <-r.recordsChan:
			// check if channel was closed and it is time to exit from worker
			if !ok {
				// send what is left in buffer
				store.AppendToSetPipelined(AnalyticsKeyName, recordsBuffer, r.storageExpirationTime)
				return
			}
			// we have new record - prepare it and add to buffer

			if encoded, err := msgpack.Marshal(record); err != nil {
				baseLog.Errorf("Error encoding analytics data: %s", err.Error())
			} else {
				recordsBuffer = append(recordsBuffer, encoded)
			}

			// identify that buffer is ready to be sent
			readyToSend = uint64(len(recordsBuffer)) == r.workerBufferSize

		case <-time.After(time.Duration(r.recordsBufferFlushInterval) * time.Millisecond):
			// nothing was received for that period of time
			// anyways send whatever we have, don't hold data too long in buffer
			readyToSend = true
		}

		// send data to Redis and reset buffer
		if len(recordsBuffer) > 0 && (readyToSend || time.Since(lastSentTS) >= recordsBufferForcedFlushInterval) {
			store.AppendToSetPipelined(AnalyticsKeyName, recordsBuffer, r.storageExpirationTime)
			recordsBuffer = recordsBuffer[:0]
			lastSentTS = time.Now()
		}
	}
}
